package mq

import (
	"errors"
	"fmt"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/Shopify/sarama"
	cluster "github.com/bsm/sarama-cluster"
	"github.com/eapache/go-resiliency/breaker"
)

//消费端
const (
	//KafkaConsumerConnected 消费者已连接
	KafkaConsumerConnected string = "connected"
	//KafkaConsumerDisconnected 消费者断开
	KafkaConsumerDisconnected string = "disconnected"
)

type Consumer struct {
	hosts    []string
	topics   []string
	config   *cluster.Config
	consumer *cluster.Consumer
	status   string
	groupID  string

	breaker    *breaker.Breaker
	reConnect  chan bool
	statusLock sync.Mutex
	exit       bool
}

//KafkaMessageHandler  消费者回调函数
type KafkaMessageHandler func(message *sarama.ConsumerMessage) (bool, error)

//kafka 默认配置
func GetDefaultConsumerConfig() (config *cluster.Config) {
	config = cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true
	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	config.Consumer.Group.Session.Timeout = 20 * time.Second
	config.Consumer.Group.Heartbeat.Interval = 6 * time.Second
	config.Consumer.MaxProcessingTime = 500 * time.Millisecond
	config.Consumer.Fetch.Default = 1024 * 1024 * 2
	//config.Consumer.MaxProcessingTime = 100        //消息入队时间
	//config.Consumer. = 100        //消息处理时间，超时则reblance给其他消费者，kafka默认值为300000，配置项为max.poll.interval.ms

	//config.Consumer.Group.Session.Timeout = 550000    //消费者是否存活的心跳检测，默认是10秒，对应kafka session.timeout.ms配置
	//config.Consumer.Group.Heartbeat.Interval = 100000 //消费者协调器心跳间隔时间，默认3s此值设置不超过group session超时时间的三分之一
	//config.Consumer.Group.Rebalance.Timeout = 3600 //此配置是重新平衡时消费者加入group的超时时间，默认是60s
	config.Version = sarama.V2_0_0_0
	return
}

//消费者start
func StartKafkaConsumer(hosts, topics []string, groupID string, config *cluster.Config, f KafkaMessageHandler) (*Consumer, error) {
	var err error
	if config == nil {
		config = GetDefaultConsumerConfig()
	}
	consumer := &Consumer{
		hosts:     hosts,
		config:    config,
		status:    KafkaConsumerDisconnected,
		groupID:   groupID,
		topics:    topics,
		breaker:   breaker.New(3, 1, 3*time.Second),
		reConnect: make(chan bool),
		exit:      false,
	}
	//如果初始化失败 则返回报错信息
	if consumer.consumer, err = cluster.NewConsumer(hosts, groupID, topics, consumer.config); err != nil {
		return consumer, err
	} else {
		consumer.status = KafkaConsumerConnected
		fmt.Printf("kafka consumer connected,topics:%+v,groupId:%+v", topics, groupID)
	}
	fmt.Printf("consumer.consumer:%+v", consumer.consumer)
	//保持链接
	go consumer.keepConnect()
	go consumer.consumerMessage(f)
	//消费
	return consumer, err
}

// Exit 退出消费
func (c *Consumer) Close() error {
	c.statusLock.Lock()
	defer c.statusLock.Unlock()
	c.exit = true
	return c.consumer.Close()
}

//检查kafka连接状态,如果断开链接则尝试重连
func (c *Consumer) keepConnect() {
	//如果不是退出状态
	for !c.exit {
		select {
		//重连请求
		case <-c.reConnect:
			if c.status != KafkaConsumerDisconnected {
				break
			}
			fmt.Println("kafka reconnecting")
			var consumer *cluster.Consumer
		breakLoop:
			for {
				//熔断器避免重连
				err := c.breaker.Run(func() (err error) {
					consumer, err = cluster.NewConsumer(c.hosts, c.groupID, c.topics, c.config)
					return
				})
				switch err {
				case nil:
					c.statusLock.Lock()
					if c.status == KafkaConsumerDisconnected {
						c.consumer = consumer
						c.status = KafkaConsumerConnected
					}
					c.statusLock.Unlock()
					break breakLoop
				case breaker.ErrBreakerOpen:
					fmt.Println("kafka  consumer connect fail, broker is open")
					//5s之后 触发重新链接 此时熔断器刚好处于半开状态
					if c.status == KafkaConsumerDisconnected {
						time.AfterFunc(5*time.Second, func() {
							c.reConnect <- true //重新发起
						})
					}
					break breakLoop
				default:
					fmt.Printf("kafka reconnect error:%+v", err)
				}
			}
		}
	}
}

//消费
func (c *Consumer) consumerMessage(f KafkaMessageHandler) {
	//在消费者未和 kafka 建立连接时，不消费 kafka 数据
	for !c.exit {
		//未连接则等待重连
		if c.status != KafkaConsumerConnected {
			time.Sleep(time.Second * 5)
			fmt.Println("kafka consumer status" + c.status)
			continue
		}
		//监听退出信号
		signals := make(chan os.Signal, 1)
		signal.Notify(signals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

		// handle notifications
		go func() {
			for ntf := range c.consumer.Notifications() {
				fmt.Printf("kafka consumer Rebalanced:ntf:%+v", ntf)
			}
		}()

		//consumer message
	ConsumerLoop:
		for !c.exit {
			select {
			//获取数据
			case msg, ok := <-c.consumer.Messages():
				//交给 KafkaMessageHandler 处理
				if ok {
					//处理完成需要ack
					if commit, err := f(msg); commit {
						c.consumer.MarkOffset(msg, "") //标记为已经处理
					} else {
						if err != nil {
							fmt.Printf("the comsumer failed err:%+v", err)
						}
					}
				}
			case err := <-c.consumer.Errors():
				fmt.Printf("the comsumer failed err:%+v", err)
				//需要捕获kafka 中断信息 尝试重新链接kafka
				if errors.Is(err, sarama.ErrOutOfBrokers) || errors.Is(err, sarama.ErrNotConnected) {
					c.statusLock.Lock()
					if c.status == KafkaConsumerConnected {
						c.status = KafkaConsumerDisconnected
						c.reConnect <- true
					}
					c.statusLock.Unlock()
				} else {
					// 如果不是中断信息,认为kafka挂了,进程退出
					// panic("kafka server error:" + err.Error())
				}
			case s := <-signals:
				//打印消息 接受到退出信号
				fmt.Println("kafka consumer receive system signal" + s.String())
				c.statusLock.Lock()
				c.exit = true
				//退出前先安全关闭
				err := c.consumer.Close()
				if err != nil {
					fmt.Println("consumer.Close error")
				}
				c.statusLock.Unlock()
				break ConsumerLoop
			}
		}
	}
}
